package com.isyscore.os.metadata.kettle.base;

import com.google.common.collect.Lists;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.exception.ErrorCode;
import com.isyscore.os.metadata.kettle.vis.VisNode;
import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.base.AbstractMeta;
import org.pentaho.di.core.logging.StepLogTable;
import org.pentaho.di.core.logging.TransLogTable;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.w3c.dom.Document;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component(GraphCodec.TRANS_CODEC)
@Slf4j
//@Scope("prototype")
public class FlowMetaCodec extends BaseGraphCodec {
    String FLOW_LOG_CONNECTION_NAME = Long.toString(Long.MAX_VALUE);
    Long FLOW_LOG_CONNECTION_ID = Long.MAX_VALUE;
    String KETTLE_TRANS_LOG_TABLE = "kettle_trans_log_all";
    String KETTLE_STEP_LOG_TABLE = "kettle_step_log_all";
    @Autowired
    private PluginFactory factory;

//	@Autowired
//	private FlowParser flowParser;

    @Autowired
    private FlowLogDbConfiguration flowLogDbConfiguration;

    @Override
    public AbstractMeta decode(String xml) {
        TransMeta transMeta = new TransMeta();
        try {
            Document document = XMLHandler.loadXMLString(xml);
            transMeta.loadXML(
                    document.getDocumentElement(), "", null, null, true, null, null);
        } catch (Exception e) {
            log.error("xml解析失败{}", e);
            throw new DataFactoryException(ErrorCode.KETTLE_ETL_CREATE_FAIL);
        }
        //这里这样做是为了让同一个定义的流程每次运行的时候使用不同的通道。
        transMeta.setName(transMeta.getName() + "#" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
        return transMeta;
    }


    @Override
    public AbstractMeta decode(FlowConfig transGraph, boolean doAfter) throws Exception {
        TransMeta transMeta = new TransMeta();
        //构建Info
        encodeInfo(transMeta, transGraph.getTransInfo());

        //构建DB链接
        decodeDatabases(transGraph.getTransConnections(), transMeta);

        // 构建 steps
        for (VisNode node : transGraph.getSteps()) {
            Step step = factory.getPlugin(node.getNodeType()).newInstance();
            StepMeta stepMeta = step.decode(node, transMeta.getDatabases(), transMeta, transGraph);
            stepMeta.setParentTransMeta(transMeta);
            transMeta.addStep(stepMeta);
        }
        //构建lop
        for (FlowHup transHup : transGraph.getTransHups()) {
            TransHopMeta hopinf = new TransHopMeta(null, null, true);
            String[] stepNames = transMeta.getStepNames();
            for (int j = 0; j < stepNames.length; j++) {
                if (stepNames[j].equalsIgnoreCase(transHup.getFrom()))
                    hopinf.setFromStep(transMeta.getStep(j));
                if (stepNames[j].equalsIgnoreCase(transHup.getTo()))
                    hopinf.setToStep(transMeta.getStep(j));
            }
            transMeta.addTransHop(hopinf);
        }


        if (doAfter) {
            for (VisNode node : transGraph.getSteps()) {
                Step step = factory.getPlugin(node.getNodeType()).newInstance();
                step.after(node, transMeta.getDatabases(), transMeta, transGraph);
            }

        }

        //设置日志输出配置
        //绑定日志输出源
        FlowConnection logConnection = new FlowConnection().setName(FLOW_LOG_CONNECTION_NAME)
                .setId(FLOW_LOG_CONNECTION_ID)
                .setDatabase(flowLogDbConfiguration.getDatabase())
                .setServer(flowLogDbConfiguration.getHost())
                .setType(flowLogDbConfiguration.getDbType().getName())
                .setPort(flowLogDbConfiguration.getPort())
                .setUsername(flowLogDbConfiguration.getUsername())
                .setPassword(flowLogDbConfiguration.getPassword());
        decodeDatabases(Lists.newArrayList(logConnection), transMeta);

        //设置日志输出参数
        TransLogTable transLogTable = transMeta.getTransLogTable();
        transLogTable.setConnectionName(FLOW_LOG_CONNECTION_NAME+"-"+flowLogDbConfiguration.getDatabase());
        transLogTable.setTableName(KETTLE_TRANS_LOG_TABLE);

        StepLogTable stepLogTable = transMeta.getStepLogTable();
        stepLogTable.setConnectionName(FLOW_LOG_CONNECTION_NAME+"-"+flowLogDbConfiguration.getDatabase());
        stepLogTable.setTableName(KETTLE_STEP_LOG_TABLE);

        //设置处理了多少条日志后反馈一条日志
        int logSize = transGraph.getOtherParam()==null?200:(transGraph.getOtherParam().containsKey("logSize")? Integer.parseInt(transGraph.getOtherParam().get("logSize")) :200);
        transMeta.setFeedbackSize(logSize);
        return transMeta;
    }


}
